/*
 *  Copyright 2009-2016 Weibo, Inc.
 *
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 *
 *        http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License.
 */

package cn.uncode.rpc.transpor.netty;

import java.net.InetSocketAddress;

import cn.uncode.rpc.common.ChannelState;
import cn.uncode.rpc.common.CommonConstant;
import cn.uncode.rpc.common.log.Logger;
import cn.uncode.rpc.common.log.LoggerFactory;
import cn.uncode.rpc.core.URL;
import cn.uncode.rpc.core.URLParam;
import cn.uncode.rpc.stats.StatisticCallback;
import cn.uncode.rpc.transport.RequestHandler;
import cn.uncode.rpc.transport.netty2.NettyChannelInitializer;
import cn.uncode.rpc.transport.support.AbstractServer;
import cn.uncode.rpc.util.StandardThreadExecutor;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;




/**
 * 
 * <pre>
 * 		netty server 的相关优化
 * 		1） server 的 executor handler 相关防护
 * 		2） server 的 隔离保护，不同方法。
 * 		3） 线程池调优
 * 		4） client 请求drop （提供主动和被动策略）
 * 		5） 增加降级开关。
 * 		6） server 端的超时控制
 * 		7） 关注 OOM的问题
 * 		8） Queue 大小的设置 
 * 		9） server端接收包的大小限制
 * </pre>
 * 
 * @author maijunsheng
 * 
 */
public class NettyServer extends AbstractServer implements StatisticCallback {
	
	private static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);
	
	private EventLoopGroup bossGroup;
	private EventLoopGroup workerGroup;
	private ChannelFuture channelFuture;
	
	// 单端口需要对应单executor 1) 为了更好的隔离性 2) 为了防止被动releaseExternalResources:
	private StandardThreadExecutor standardThreadExecutor = null;
	private RequestHandler requestHandler;

	public NettyServer(URL url, RequestHandler requestHandler) {
		super(url);
		this.requestHandler = requestHandler;
	}

	@Override
	public boolean start() {
		if (isAvailable()) {
			LOGGER.warn("NettyServer ServerChannel already Open: url=" + url);
			return true;
		}

		LOGGER.info("NettyServer ServerChannel start Open: url=" + url);
		
//		PortUtil.checkAvailablePort(url.getPort());
		ServerBootstrap bootstrap = initServerBootstrap();
		try {
			// start server
			channelFuture = bootstrap.bind(new InetSocketAddress(url.getPort())).sync();
			state = ChannelState.ALIVE;
			// register shutown hook
			Runtime.getRuntime().addShutdownHook(new ShutdownThread());
		} catch (Exception e) {
			LOGGER.error("Exception happen when start server", e);
		}

//		StatsUtil.registryStatisticCallback(this);
		LOGGER.info("NettyServer ServerChannel finish Open: url=" + url);

		return state.isAliveState();
	}

	private synchronized ServerBootstrap initServerBootstrap() {
		boolean shareChannel = url.getBooleanParameter(URLParam.SHARE_CHANNEL.getName(),
				URLParam.SHARE_CHANNEL.getBooleanValue());
		final int maxContentLength = url.getIntParameter(URLParam.MAX_CONTENT_LENGTH.getName(),
				URLParam.MAX_CONTENT_LENGTH.getIntValue());
		int maxServerConnection = url.getIntParameter(URLParam.MAX_SERVER_CONNECTION.getName(),
				URLParam.MAX_SERVER_CONNECTION.getIntValue());
		int workerQueueSize = url.getIntParameter(URLParam.WORKER_QUEUE_SIZE.getName(),
				URLParam.WORKER_QUEUE_SIZE.getIntValue());

		int minWorkerThread = 0, maxWorkerThread = 0;

		if (shareChannel) {
			minWorkerThread = url.getIntParameter(URLParam.MIN_WORKER_THREAD.getName(),
					CommonConstant.NETTY_SHARECHANNEL_MIN_WORKDER);
			maxWorkerThread = url.getIntParameter(URLParam.MAX_WORDER_THREAD.getName(),
					CommonConstant.NETTY_SHARECHANNEL_MAX_WORKDER);
		} else {
			minWorkerThread = url.getIntParameter(URLParam.MIN_WORKER_THREAD.getName(),
					CommonConstant.NETTY_NOT_SHARECHANNEL_MIN_WORKDER);
			maxWorkerThread = url.getIntParameter(URLParam.MAX_WORDER_THREAD.getName(),
					CommonConstant.NETTY_NOT_SHARECHANNEL_MAX_WORKDER);
		}

		standardThreadExecutor = (standardThreadExecutor != null && !standardThreadExecutor.isShutdown()) ? standardThreadExecutor
				: new StandardThreadExecutor(minWorkerThread, maxWorkerThread, workerQueueSize,
						new DefaultThreadFactory("NettyServer-" + url.getServerPortStr(), true));
		standardThreadExecutor.prestartAllCoreThreads();
		
		bossGroup = new NioEventLoopGroup();
		workerGroup = new NioEventLoopGroup();
		ServerBootstrap b = new ServerBootstrap();
		// TODO 连接数的管理，进行最大连接数的限制 
		b.group(bossGroup, workerGroup)
				.channel(NioServerSocketChannel.class)
				.option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.TCP_NODELAY, true)
				.option(ChannelOption.SO_TIMEOUT, 6000)
				.childOption(ChannelOption.SO_REUSEADDR, true).childOption(ChannelOption.SO_KEEPALIVE, true)
				.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

		b.childHandler(new NettyChannelInitializer(url, requestHandler));
		return b;
	}
	
	
	class ShutdownThread extends Thread {
		@Override
		public void run() {
			NettyServer.this.stop();
		}
	}

	@Override
	public void stop() {
		if (state.isCloseState()) {
			LOGGER.info("NettyServer close fail: already close, url={}", url.getUri());
			return;
		}

		if (state.isUnInitState()) {
			LOGGER.info("NettyServer close Fail: don't need to close because node is unInit state: url={}",
					url.getUri());
			return;
		}

		try {
			// close listen socket
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
			// close all clients's channel
			if(channelFuture.channel() != null){
				channelFuture.channel().close();
			}
			// shutdown the threadPool
			standardThreadExecutor.shutdownNow();
			// 设置close状态
			state = ChannelState.CLOSE;
			// 取消统计回调的注册
//			StatsUtil.unRegistryStatisticCallback(this);
			LOGGER.info("NettyServer close Success: url={}", url.getUri());
		} catch (Exception e) {
			LOGGER.error("NettyServer close Error: url=" + url.getUri(), e);
		}
	}

	@Override
	public boolean isStop() {
		return state.isCloseState();
	}

	@Override
	public boolean isAvailable() {
		return state.isAliveState();
	}

	@Override
	public URL getUrl() {
		return url;
	}

	/**
	 * 统计回调接口
	 */
	@Override
	public String statisticCallback() {
		return String.format(
				"identity: %s connectionCount: %s taskCount: %s queueCount: %s maxThreadCount: %s maxTaskCount: %s",
				url.getIdentity(), "", standardThreadExecutor.getSubmittedTasksCount(),
				standardThreadExecutor.getQueue().size(), standardThreadExecutor.getMaximumPoolSize(),
				standardThreadExecutor.getMaxSubmittedTaskCount());
	}

	
}
